深入理解NATS & NATS Streaming (踩坑记)

简介

NATS Server是一个高性能的, cloud native的, 基于发布订阅机制的消息系统, 没有消息持久化功能.
NATS Streaming Server是基于NATS Server的, 增加消息持久化功能的消息系统.

NATS Streaming 持久化特性踩坑记

官网的文档并不详细, 很多重要的技术细节没说, 看了官网的文档之后发现用法很简单, 然后直接去写代码, 写publisher代码没什么问题, 写subscriber代码也能正常工作. 但是subscriber一重启, 重启后重启期间publisher发的消息不会继续收到, 说好的持久化呢? 我把官网的文档翻了遍也没找到答案. 最后在项目的readme.md中找到了答案: 要让subscriber重启后能继续收到重启期间发过来的消息且不重复消息, 必须在调用Subscribe(subject string, cb MsgHandler, opts ...SubscriptionOption) (Subscription, error)订阅时设置一样的durableName, 且重启后连接时Connect(stanClusterID, clientID string, options ...Option) (Conn, error)ClusterID、clientID不能变.

要想理解NATS和NATS Streaming的特性, server和client的readme文档都需要仔细阅读, 特别是nats-streaming服务端的readme. 代码也值得阅读研究.

重要特性说明

  1. 当subject没有被订阅时, 消息会被直接丢弃, 所以重启订阅者会丢消息, 解决办法: 要么开2个以上客户端实例, 组成队列订阅QueueSubscribe, 要么换NATS Streaming.
  2. clientID和durableName对于NATS Streaming非常重要. 要让subscriber重启后能继续收到重启期间发过来的消息且不重复消息, 必须在调用Subscribe(subject string, cb MsgHandler, opts ...SubscriptionOption) (Subscription, error)订阅时设置一样的durableName, 调用Connect(stanClusterID, clientID string, options ...Option) (Conn, error)连接时ClusterID、clientID不能变. 程序关闭时应该使用Close而不是Unsubscribe, Unsubscribe()会删除在server端删除该持久化订阅.

    This client ID links a given connection to its published messages, subscriptions, especially durable subscriptions. Indeed, durable subscriptions are stored as a combination of the client ID and durable name.
    If an application wishes to resume message consumption from where it previously stopped, it needs to create a durable subscription. It does so by providing a durable name, which is combined with the client ID provided when the client created its connection. The server then maintain the state for this subscription even after the client connection is closed.
    Note: The starting position given by the client when restarting a durable subscription is ignored.
    When the application wants to stop receiving messages on a durable subscription, it should close - but not unsubscribe- this subscription. If a given client library does not have the option to close a subscription, the application should close the connection instead.
    When the application wants to delete the subscription, it must unsubscribe it. Once unsubscribed, the state is removed and it is then possible to re-use the durable name, but it will be considered a brand new durable subscription, with the start position being the one given by the client when creating the durable subscription.

  1. NATS连接时可以设置客户端的名字, 这样在monitor界面中的/connz就能方便地看到各个客户端的统计数据.

    // Options that can be passed to Connect.   // Name is an Option to set the client name. func Name(name string) Option {
    return func(o *Options) error {
    o.Name = name
    return nil
    }
    }

    type ConnInfo struct {
    Cid uint64 `json:"cid"`
    IP string `json:"ip"`
    Port int `json:"port"`
    Start time.Time `json:"start"`
    LastActivity time.Time `json:"last_activity"`
    Uptime string `json:"uptime"`
    Idle string `json:"idle"`
    Pending int `json:"pending_bytes"`
    InMsgs int64 `json:"in_msgs"`
    OutMsgs int64 `json:"out_msgs"`
    InBytes int64 `json:"in_bytes"`
    OutBytes int64 `json:"out_bytes"`
    NumSubs uint32 `json:"subscriptions"`
    Name string `json:"name,omitempty"`
    Lang string `json:"lang,omitempty"`
    Version string `json:"version,omitempty"`
    TLSVersion string `json:"tls_version,omitempty"`
    TLSCipher string `json:"tls_cipher_suite,omitempty"`
    AuthorizedUser string `json:"authorized_user,omitempty"`
    Subs []string `json:"subscriptions_list,omitempty"`
    }
  2. 使用.来分隔subject的级别. NATS允许subject包含斜杠/符号, 但NATS Streaming不允许, 因为NATS Streaming持久化时会使用subject名字来作为文件夹名,

    • NATS的subject可以为任意不为空的字符串, 具体的subject不能包含通配符’*’和’>’.
    • NATS Streaming的subject不能为空, 首尾不能为点’.’, 不能包含两个连续的点’.’, 由于暂时不支持通配符订阅功能, 所以不能包含’*’和’>’.
  3. NATS Streaming Server实际上是内嵌了一个NATS Server, 自己作为NATS的客户端. NATS Streaming的客户端实际上没有和NATS Streaming Server直接连接, 而是连接内嵌的NATS Server, NATS Streaming Server通过订阅客户端的心跳来知道NATS Streaming客户端连接有没有断开. 所以它强烈建议客户端退出程序时主动Close.
  4. NATS可以热重新加载配置, 发送SIGHUP信号或gnatsd -sl reload即可.
  5. 开发环境可以加-V参数了解NATS, 生产环境就没必要了, 否则会把发过来的消息全打在日志里.
  6. 你甚至可以用NATS的client包publish消息到NATS Streaming, NATS的client可以subscribe, 但NATS Streaming的client无法subscribe, 因为内部的subject变了. 最好不用混用, 容易出问题.
  7. NATS Streaming客户端连接时提供的ClusterID和服务端启动配置的ClusterID不一致时会报, 有人表示费解吐槽过, https://github.com/nats-io/nats-streaming-server/issues/309, 但官方解释说没有问题, Timeout也说的通.

    If you provide a cluster ID not used by any of the servers in the network, no server will respond to the client, hence the timeout error message from the client library. If anything, this is an error message that needs to be updated in the client libraries, not in the server.

  8. ChanSubscribe方式的客户端优雅关闭, 等待消息处理完成.

    package main

    import (
    "fmt"
    "os" "syscall" "os/signal" "github.com/nats-io/go-nats" "sync" )

    func main() {
    n, err := nats.Connect("nats://127.0.0.1:7222",
    nats.Name("test_client"),
    nats.UserInfo("", ""))
    if err != nil {
    panic(err)
    }
    subject := "test"
    msgCh := make(chan *nats.Msg, nats.DefaultMaxChanLen)
    _, err = n.ChanSubscribe(subject, msgCh)
    if err != nil {
    panic(err)
    }
    wg := sync.WaitGroup{}
    for i := 0; i < 2; i++ {
    wg.Add(1)
    go func() {
    defer wg.Done()
    // msg handler
    for msg := range msgCh {
    fmt.Printf("%s\n", msg.Data)
    }
    }()
    }
    quit := make(chan os.Signal)
    signal.Notify(quit, syscall.SIGQUIT,
    syscall.SIGTERM,
    syscall.SIGINT,
    syscall.SIGUSR1,
    syscall.SIGUSR2)
    select {
    case <-quit:
    defer wg.Wait()
    // close msgCh and wait process ok
    close(msgCh)
    n.Flush()
    n.Close()
    }
    }

NATS代码中的技巧

  1. 很有用的Go风格的可选参数设计模式, 很多地方见过.

    // Option is a function on the options for a connection. 
    type Option func(*Options) error

    // Options can be used to create a customized connection.
    type Options struct {
    Url string
    ...
    User string
    Password string
    }
    var DefaultOptions = Options{
    AllowReconnect: true,
    MaxReconnect: DefaultMaxReconnect,
    ReconnectWait: DefaultReconnectWait,
    Timeout: DefaultTimeout,
    PingInterval: DefaultPingInterval,
    MaxPingsOut: DefaultMaxPingOut,
    SubChanLen: DefaultMaxChanLen,
    ReconnectBufSize: DefaultReconnectBufSize,
    Dialer: &net.Dialer{
    Timeout: DefaultTimeout,
    }, }

    // Connect will attempt to connect to the NATS system.
    // The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
    // Comma separated arrays are also supported, e.g. urlA, urlB.
    // Options start with the defaults but can be overridden.
    func Connect(url string, options ...Option) (*Conn, error) {
    opts := DefaultOptions
    opts.Servers = processUrlString(url)
    for _, opt := range options {
    if err := opt(&opts); err != nil {
    return nil, err
    }
    }
    return opts.Connect()
    }

    // Options that can be passed to Connect. // Name is an Option to set the client name. func Name(name string) Option {
    return func(o *Options) error {
    o.Name = name
    return nil
    }
    }
  2. 使用ringBuffer限制消息数量

    You can view a message log as a ring buffer. Messages are appended to the end of the log. If a limit is set globally for all channels, or specifically for this channel, when the limit is reached, older messages are removed to make room for the new ones.
    
  3. 用reflect来绑定任意类型的chan

    chVal := reflect.ValueOf(channel)
    if chVal.Kind() != reflect.Chan {
    return ErrChanArg
    }
    val, ok := chVal.Recv()
    if !ok {
    // Channel has most likely been closed.
    return
    }